
数据库中间件 MyCAT源码分析 —— XA分布式事务

2017-07-15

MyCat-Server 带注释代码地址 :https://github.com/YunaiV/Mycat-Server
  • 1. 概述

  • 2. XA 概念

  • 3. MyCAT 代码实现

    • 3.1 JDBC Demo 代码

    • 3.2 MyCAT 开启 XA 事务

    • 3.3 MyCAT 接收 SQL

    • 3.4 MySQL 接收 COMMIT

      • 3.4.1 单节点事务 or 多节点事务

      • 3.4.2 协调日志

      • 3.4.3 MultiNodeCoordinator

      • 3.5 MyCAT 启动回滚 XA事务

  • 4. MyCAT 实现缺陷

    • 4.1 协调日志写入性能

    • 4.2 数据节点未全部 PREPARE 就进行 COMMIT

    • 4.3 MyCAT 启动回滚 PREPARE 的 XA事务

    • 4.4 单节点事务未记录协调日志

    • 4.5 XA COMMIT 部分节点挂了重新恢复后,未进一步处理

  • 5. 彩蛋

1. 概述

数据库拆分后,业务上会碰到需要分布式事务的场景。MyCAT 基于 XA 实现分布式事务。国内目前另外一款很火的数据库中间件 Sharding-JDBC 准备基于 TCC 实现分布式事务。


  1. XA 概念简述

  2. MyCAT 代码如何实现 XA

  3. MyCAT 在实现 XA 存在的一些缺陷

2. XA 概念

> X/Open 组织(即现在的 Open Group )定义了分布式事务处理模型。 X/Open DTP 模型( 1994 )包括:

  1. 应用程序( AP )

  2. 事务管理器( TM )

  3. 资源管理器( RM )

  4. 通信资源管理器( CRM )
    一般,常见的事务管理器( TM )是交易中间件,常见的资源管理器( RM )是数据库,常见的通信资源管理器( CRM)是消息中间件,下图是X/Open DTP模型:


  1. 配置 TM ,通过 TM 或者 RM 提供的方式,把 RM 注册到 TM。可以理解为给 TM 注册 RM 作为数据源。一个 TM 可以注册多个 RM

  2. AP 从 TM 获取资源管理器的代理(例如:使用JTA接口,从TM管理的上下文中,获取出这个TM所管理的RM的JDBC连接或JMS连接)
    AP 向 TM 发起一个全局事务。这时,TM 会通知各个 RMXID(全局事务ID)会通知到各个RM。

  3. AP 通过 TM 中获取的连接,间接操作 RM 进行业务操作。这时,TM 在每次 AP 操作时把 XID(包括所属分支的信息)传递给 RMRM 正是通过这个 XID 关联来操作和事务的关系的。

  4. AP 结束全局事务时,TM 会通知 RM 全局事务结束。开始二段提交,也就是prepare - commit的过程。

XA协议指的是TM(事务管理器)和RM(资源管理器)之间的接口。目前主流的关系型数据库产品都是实现了XA接口的。JTA(Java Transaction API)是符合X/Open DTP模型的,事务管理器和资源管理器之间也使用了XA协议。 本质上也是借助两阶段提交协议来实现分布式事务的,下面分别来看看XA事务成功和失败的模型图:

😈 看到这里是不是有种黑人问号的感觉?淡定!我们接下来看 MyCAT 代码层面是如何实现 XA 的。另外,有兴趣对概念了解更多的,可以参看如下文章:

  1. 《XA事务处理》

  2. 《XA Transaction SQL Syntax》

  3. 《MySQL XA 事务支持调研》

3. MyCAT 代码实现

  • MyCAT :TM,协调者。

  • 数据节点 :RM,参与者。

3.1 JDBC Demo 代码

  1. public class MyCATXAClientDemo {

  2.    public static void main(String[] args) throws ClassNotFoundException, SQLException {

  3.        // 1. 获得数据库连接

  4.        Class.forName("com.mysql.jdbc.Driver");

  5.        Connection conn = DriverManager.getConnection("jdbc:mysql://", "root", "123456");

  6.        conn.setAutoCommit(false);

  7.        // 2. 开启 MyCAT XA 事务

  8.        conn.prepareStatement("set xa=on").execute();

  9.        // 3. 插入 SQL

  10.        // 3.1 SQL1 A库

  11.        long uid = Math.abs(new Random().nextLong());

  12.        String username = UUID.randomUUID().toString();

  13.        String password = UUID.randomUUID().toString();

  14.        String sql1 = String.format("insert into t_user(id, username, password) VALUES (%d, '%s', '%s')",

  15.                uid, username, password);

  16.        conn.prepareStatement(sql1).execute();

  17.        // 3.2 SQL2 B库

  18.        long orderId = Math.abs(new Random().nextLong());

  19.        String nickname = UUID.randomUUID().toString();

  20.        String sql2 = String.format("insert into t_order(id, uid, nickname) VALUES(%d, %s, '%s')", orderId, uid, nickname);

  21.        conn.prepareStatement(sql2).execute();

  22.        // 4. 提交 XA 事务

  23.        conn.commit();

  24.    }

  25. }

  • setxa=on MyCAT 开启 XA 事务。

  • conn.commit 提交 XA 事务。

3.2 MyCAT 开启 XA 事务

当 MyCAT 接收到 setxa=on 命令时,开启 XA 事务,并生成 XA 事务编号。XA 事务编号生成算法为 UUID。核心代码如下:

  1. // SetHandler.java

  2. public static void handle(String stmt, ServerConnection c, int offset) {

  3.        int rs = ServerParseSet.parse(stmt, offset);

  4.        switch (rs & 0xff) {

  5.        // ... 省略代码

  6.        case XA_FLAG_ON: {

  7.            if (c.isAutocommit()) {

  8.                c.writeErrMessage(ErrorCode.ERR_WRONG_USED, "set xa cmd on can't used in autocommit connection ");

  9.                return;

  10.            }

  11.            c.getSession2().setXATXEnabled(true);

  12.            c.write(c.writeToBuffer(OkPacket.OK, c.allocate()));

  13.            break;

  14.        }

  15.        case XA_FLAG_OFF: {

  16.            c.writeErrMessage(ErrorCode.ERR_WRONG_USED,

  17.                    "set xa cmd off not for external use ");

  18.            return;

  19.        }

  20.        // ... 省略代码

  21.    }

  22. }

  23. // NonBlockingSession.java

  24. public void setXATXEnabled(boolean xaTXEnabled) {

  25.   if (xaTXEnabled) {

  26.       if (this.xaTXID == null) {

  27.           xaTXID = genXATXID(); // 😈😈😈获得 XA 事务编号

  28.       }

  29.   } else {

  30.       this.xaTXID = null;

  31.   }

  32. }

  33. private String genXATXID() {

  34.   return MycatServer.getInstance().getXATXIDGLOBAL();

  35. }

  36. // MycatServer.java

  37. public String getXATXIDGLOBAL() {

  38.   return "'" + getUUID() + "'";

  39. }

  40. public static String getUUID() { // 😈😈😈

  41.   String s = UUID.randomUUID().toString();

  42.   return s.substring(0, 8) + s.substring(9, 13) + s.substring(14, 18) + s.substring(19, 23) + s.substring(24);

  43. }

3.3 MyCAT 接收 SQL

此处 SQL 指的是 insertupdatedelete 操作。

当向某个数据节点第一次发起 SQL 时,会在 SQL 前面附加 XA START'xaTranId',并设置该数据节点连接事务状态为 TxState.TX_STARTED_STATE分布式事务状态,下文会专门整理)。核心代码如下:

  1. // MySQLConnection.java

  2. private void synAndDoExecute(String xaTxID, RouteResultsetNode rrn,

  3.                                 int clientCharSetIndex, int clientTxIsoLation,

  4.                                 boolean clientAutoCommit) {

  5.   String xaCmd = null;

  6.   boolean conAutoComit = this.autocommit;

  7.   String conSchema = this.schema;

  8.   // never executed modify sql,so auto commit

  9.   boolean expectAutocommit = !modifiedSQLExecuted || isFromSlaveDB() || clientAutoCommit;

  10.   if (expectAutocommit == false && xaTxID != null && xaStatus == TxState.TX_INITIALIZE_STATE) { // 😈😈😈

  11.       xaCmd = "XA START " + xaTxID + ';';

  12.       this.xaStatus = TxState.TX_STARTED_STATE;

  13.   }

  14.   // .... 省略代码

  15.   StringBuilder sb = new StringBuilder();

  16.   // .... 省略代码

  17.   if (xaCmd != null) {

  18.       sb.append(xaCmd);

  19.   }

  20.   // and our query sql to multi command at last

  21.   sb.append(rrn.getStatement() + ";");

  22.   // syn and execute others

  23.   this.sendQueryCmd(sb.toString());

  24. }

举个 变量 sb 的例子:

  1. SET names utf8;SET autocommit=0;XA START '1f2da7353e8846e5833b8d8dd041cfb1','db2';insert into t_user(id, username, password) VALUES (3400, 'b7c5ec1f-11cc-4599-851c-06ad617fec42', 'd2694679-f6a2-4623-a339-48d4a868be90');


3.4.1 单节点事务 or 多节点事务

COMMIT 执行时,MyCAT 会判断 XA 事务里,涉及到的数据库节点数量。

  • 如果节点数量为 1,单节点事务,使用 CommitNodeHandler 处理。

  • 如果节点数量 > 1,多节点事务,使用 MultiNodeCoordinator 处理。

CommitNodeHandler 相比 MultiNodeCoordinator 来说,只有一个数据节点,不需要进行多节点协调,逻辑会相对简单,有兴趣的同学可以另外看。我们主要分析 MultiNodeCoordinator

3.4.2 协调日志

协调日志,记录协调过程中各数据节点 XA 事务状态,处理MyCAT异常奔溃或者数据节点部分XA COMMIT,另外部分 XA PREPARE下的状态恢复。

XA 事务共有种








  1. CoordinatorLogEntry :协调者日志

  2. ParticipantLogEntry :参与者日志。此处,数据节点扮演参与者的角色。下文中,可能会出现参与者与数据节点混用的情况,望见谅。

一次 XA 事务,对应一条 CoordinatorLogEntry。一条 CoordinatorLogEntry 包含 N条 ParticipantLogEntry。 核心代码如下:

  1. // CoordinatorLogEntry :协调者日志

  2. public class CoordinatorLogEntry implements Serializable {

  3.    /**

  4.     * XA 事务编号

  5.     */

  6.    public final String id;

  7.    /**

  8.     * 参与者日志数组

  9.     */

  10.    public final ParticipantLogEntry[] participants;

  11. }

  12. // ParticipantLogEntry :参与者日志

  13. public class ParticipantLogEntry implements Serializable {

  14.    /**

  15.     * XA 事务编号

  16.     */

  17.    public String coordinatorId;

  18.    /**

  19.     * 数据库 uri

  20.     */

  21.    public String uri;

  22.    /**

  23.     * 过期描述

  24.     */

  25.    public long expires;

  26.    /**

  27.     * XA 事务状态

  28.     */

  29.    public int txState;

  30.    /**

  31.     * 参与者名字

  32.     */

  33.    public String resourceName;

  34. }

MyCAT 记录协调日志以 JSON格式 到文件每行包含一条 CoordinatorLogEntry。举个例子:

  1. {"id":"'e827b3fe666c4d968961350d19adda31'","participants":[{"uri":"","state":"3","expires":0,"resourceName":"db3"},{"uri":"","state":"3","expires":0,"resourceName":"db1"}]}

  2. {"id":"'f00b61fa17cb4ec5b8264a6d82f847d0'","participants":[{"uri":"","state":"3","expires":0,"resourceName":"db2"},{"uri":"","state":"3","expires":0,"resourceName":"db1"}]}


  1. // XA 协调者日志 存储接口:https://github.com/YunaiV/Mycat-Server/blob/1.6/src/main/java/io/mycat/backend/mysql/xa/recovery/Repository.java

  2. public interface Repository {}

  3. // XA 协调者日志 文件存储:https://github.com/YunaiV/Mycat-Server/blob/1.6/src/main/java/io/mycat/backend/mysql/xa/recovery/impl/FileSystemRepository.java

  4. public class FileSystemRepository implements Repository {}

  5. // XA 协调者日志 文件存储:https://github.com/YunaiV/Mycat-Server/blob/1.6/src/main/java/io/mycat/backend/mysql/xa/recovery/impl/InMemoryRepository.java

  6. public class InMemoryRepository implements Repository {}

目前日志文件写入的方式性能较差,这里我们不做分析,在【4. MyCAT 实现缺陷】里一起讲。

3.4.3 MultiNodeCoordinator


第一阶段:发起 PREPARE。

  1. public void executeBatchNodeCmd(SQLCtrlCommand cmdHandler) {

  2.   this.cmdHandler = cmdHandler;

  3.   final int initCount = session.getTargetCount();

  4.   runningCount.set(initCount);

  5.   nodeCount = initCount;

  6.   failed.set(false);

  7.   faileCount.set(0);

  8.   //recovery nodes log

  9.   ParticipantLogEntry[] participantLogEntry = new ParticipantLogEntry[initCount];

  10.   // 执行

  11.   int started = 0;

  12.   for (RouteResultsetNode rrn : session.getTargetKeys()) {

  13.       if (rrn == null) {

  14.           continue;

  15.       }

  16.       final BackendConnection conn = session.getTarget(rrn);

  17.       if (conn != null) {

  18.           conn.setResponseHandler(this);

  19.           //process the XA_END XA_PREPARE Command

  20.           MySQLConnection mysqlCon = (MySQLConnection) conn;

  21.           String xaTxId = null;

  22.           if (session.getXaTXID() != null) {

  23.               xaTxId = session.getXaTXID() + ",'" + mysqlCon.getSchema() + "'";

  24.           }

  25.           if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE) { // XA 事务

  26.               //recovery Log

  27.               participantLogEntry[started] = new ParticipantLogEntry(xaTxId, conn.getHost(), 0, conn.getSchema(), ((MySQLConnection) conn).getXaStatus());

  28.               String[] cmds = new String[]{"XA END " + xaTxId, // XA END 命令

  29.                       "XA PREPARE " + xaTxId}; // XA PREPARE 命令

  30.               mysqlCon.execBatchCmd(cmds);

  31.           } else { // 非 XA 事务

  32.               // recovery Log

  33.               participantLogEntry[started] = new ParticipantLogEntry(xaTxId, conn.getHost(), 0, conn.getSchema(), ((MySQLConnection) conn).getXaStatus());

  34.               cmdHandler.sendCommand(session, conn);

  35.           }

  36.           ++started;

  37.       }

  38.   }

  39.   // xa recovery log

  40.   if (session.getXaTXID() != null) {

  41.       CoordinatorLogEntry coordinatorLogEntry = new CoordinatorLogEntry(session.getXaTXID(), false, participantLogEntry);

  42.       inMemoryRepository.put(session.getXaTXID(), coordinatorLogEntry);

  43.       fileRepository.writeCheckpoint(inMemoryRepository.getAllCoordinatorLogEntries());

  44.   }

  45.   if (started < nodeCount) { // TODO 疑问:如何触发

  46.       runningCount.set(started);

  47.       LOGGER.warn("some connection failed to execute " + (nodeCount - started));

  48.       /**

  49.        * assumption: only caused by front-end connection close. <br/>

  50.        * Otherwise, packet must be returned to front-end

  51.        */

  52.       failed.set(true);

  53.   }

  54. }

  • 向各数据节点发送 XAEND + XA PREPARE 指令。举个 变量 cmds 例子:

  1. XA END '4cbb18214d0b47adbdb0658598666677','db3';XA PREPARE '4cbb18214d0b47adbdb0658598666677','db3';

  • 记录协调日志。每条参与者日志状态为 TxState.TX_STARTED_STATE

第二阶段:发起 COMMIT。

  1. @Override

  2. public void okResponse(byte[] ok, BackendConnection conn) {

  3.   // process the XA Transatcion 2pc commit

  4.   if (conn instanceof MySQLConnection) {

  5.       MySQLConnection mysqlCon = (MySQLConnection) conn;

  6.       switch (mysqlCon.getXaStatus()) {

  7.           case TxState.TX_STARTED_STATE:

  8.               //if there have many SQL execute wait the okResponse,will come to here one by one

  9.               //should be wait all nodes ready ,then send xa commit to all nodes.

  10.               if (mysqlCon.batchCmdFinished()) {

  11.                   String xaTxId = session.getXaTXID();

  12.                   String cmd = "XA COMMIT " + xaTxId + ",'" + mysqlCon.getSchema() + "'";

  13.                   if (LOGGER.isDebugEnabled()) {

  14.                       LOGGER.debug("Start execute the cmd :" + cmd + ",current host:" + mysqlCon.getHost() + ":" + mysqlCon.getPort());

  15.                   }

  16.                   // recovery log

  17.                   CoordinatorLogEntry coordinatorLogEntry = inMemoryRepository.get(xaTxId);

  18.                   for (int i = 0; i < coordinatorLogEntry.participants.length; i++) {

  19.                       LOGGER.debug("[In Memory CoordinatorLogEntry]" + coordinatorLogEntry.participants[i]);

  20.                       if (coordinatorLogEntry.participants[i].resourceName.equals(conn.getSchema())) {

  21.                           coordinatorLogEntry.participants[i].txState = TxState.TX_PREPARED_STATE;

  22.                       }

  23.                   }

  24.                   inMemoryRepository.put(xaTxId, coordinatorLogEntry);

  25.                   fileRepository.writeCheckpoint(inMemoryRepository.getAllCoordinatorLogEntries());

  26.                   // send commit

  27.                   mysqlCon.setXaStatus(TxState.TX_PREPARED_STATE);

  28.                   mysqlCon.execCmd(cmd);

  29.               }

  30.               return;

  31.           case TxState.TX_PREPARED_STATE: {

  32.               // recovery log

  33.               String xaTxId = session.getXaTXID();

  34.               CoordinatorLogEntry coordinatorLogEntry = inMemoryRepository.get(xaTxId);

  35.               for (int i = 0; i < coordinatorLogEntry.participants.length; i++) {

  36.                   if (coordinatorLogEntry.participants[i].resourceName.equals(conn.getSchema())) {

  37.                       coordinatorLogEntry.participants[i].txState = TxState.TX_COMMITED_STATE;

  38.                   }

  39.               }

  40.               inMemoryRepository.put(xaTxId, coordinatorLogEntry);

  41.               fileRepository.writeCheckpoint(inMemoryRepository.getAllCoordinatorLogEntries());

  42.               // XA reset status now

  43.               mysqlCon.setXaStatus(TxState.TX_INITIALIZE_STATE);

  44.               break;

  45.           }

  46.           default:

  47.       }

  48.   }

  49.   // 释放连接

  50.   if (this.cmdHandler.relaseConOnOK()) {

  51.       session.releaseConnection(conn);

  52.   } else {

  53.       session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(), false);

  54.   }

  55.   // 是否所有节点都完成commit,如果是,则返回Client 成功

  56.   if (this.finished()) {

  57.       cmdHandler.okResponse(session, ok);

  58.       if (cmdHandler.isAutoClearSessionCons()) {

  59.           session.clearResources(false);

  60.       }

  61.       /* 1.  事务提交后,xa 事务结束   */

  62.       if (session.getXaTXID() != null) {

  63.           session.setXATXEnabled(false);

  64.       }

  65.       /* 2. preAcStates 为true,事务结束后,需要设置为true。preAcStates 为ac上一个状态    */

  66.       if (session.getSource().isPreAcStates()) {

  67.           session.getSource().setAutocommit(true);

  68.       }

  69.   }

  70. }

  • mysqlCon.batchCmdFinished() 每个数据节点,第一次返回的是 XAEND 成功,第二次返回的是 XA PREPARE。在 XA PREPARE 成功后,记录该数据节点的参与者日志状态为 TxState.TX_PREPARED_STATE。之后,向该数据节点发起 XA COMMIT 命令。

  • XA COMMIT 返回成功后,记录该数据节点的事务参与者日志状态为 TxState.TX_COMMITED_STATE

  • 当所有数据节点(参与者)都执行完成 XA COMMIT 返回,即 this.finished()==true,返回 MySQL Client XA 事务提交成功。

[x] XA PREPAREXA COMMIT,数据节点可能返回失败,目前暂时没模拟出来,对应方法为 #errorResponse(....)

3.5 MyCAT 启动回滚 XA事务

MyCAT 启动时,会回滚处于TxState.TXPREPAREDSTATEParticipantLogEntry 对应的数据节点的 XA 事务。代码如下:

  1. // MycatServer.java

  2. private void performXARecoveryLog() {

  3.   // fetch the recovery log

  4.   CoordinatorLogEntry[] coordinatorLogEntries = getCoordinatorLogEntries();

  5.   for (int i = 0; i < coordinatorLogEntries.length; i++) {

  6.       CoordinatorLogEntry coordinatorLogEntry = coordinatorLogEntries[i];

  7.       boolean needRollback = false;

  8.       for (int j = 0; j < coordinatorLogEntry.participants.length; j++) {

  9.           ParticipantLogEntry participantLogEntry = coordinatorLogEntry.participants[j];

  10.           if (participantLogEntry.txState == TxState.TX_PREPARED_STATE) {

  11.               needRollback = true;

  12.               break;

  13.           }

  14.       }

  15.       if (needRollback) {

  16.           for (int j = 0; j < coordinatorLogEntry.participants.length; j++) {

  17.               ParticipantLogEntry participantLogEntry = coordinatorLogEntry.participants[j];

  18.               //XA rollback

  19.               String xacmd = "XA ROLLBACK " + coordinatorLogEntry.id + ';';

  20.               OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(new String[0], new XARollbackCallback());

  21.               outloop:

  22.               for (SchemaConfig schema : MycatServer.getInstance().getConfig().getSchemas().values()) {

  23.                   for (TableConfig table : schema.getTables().values()) {

  24.                       for (String dataNode : table.getDataNodes()) {

  25.                           PhysicalDBNode dn = MycatServer.getInstance().getConfig().getDataNodes().get(dataNode);

  26.                           if (dn.getDbPool().getSource().getConfig().getIp().equals(participantLogEntry.uri)

  27.                                   && dn.getDatabase().equals(participantLogEntry.resourceName)) {

  28.                               //XA STATE ROLLBACK

  29.                               participantLogEntry.txState = TxState.TX_ROLLBACKED_STATE;

  30.                               SQLJob sqlJob = new SQLJob(xacmd, dn.getDatabase(), resultHandler, dn.getDbPool().getSource());

  31.                               sqlJob.run();

  32.                               break outloop;

  33.                           }

  34.                       }

  35.                   }

  36.               }

  37.           }

  38.       }

  39.   }

  40.   // init into in memory cached

  41.   for (int i = 0; i < coordinatorLogEntries.length; i++) {

  42.  MultiNodeCoordinator.inMemoryRepository.put(coordinatorLogEntries[i].id, coordinatorLogEntries[i]);

  43.   }

  44.   // discard the recovery log

  45.    MultiNodeCoordinator.fileRepository.writeCheckpoint(MultiNodeCoordinator.inMemoryRepository.getAllCoordinatorLogEntries());

  46. }

4. MyCAT 实现缺陷

MyCAT 1.6.5 版本实现弱XA事务,相对来说,笔者认为距离实际生产使用存在一些差距。下面罗列可能存在的缺陷,如有错误,麻烦指出。🙂希望 MyCAT 在分布式事务的实现上,能够越来越给力。

4.1 协调日志写入性能

1、 CoordinatorLogEntryParticipantLogEntry 在每次写入文件时,是将内存中所有的日志全部重新写入,导致写入性能随着 XA 事务次数的增加,性能会越来越糟糕,导致 XA 事务整体性能会非常差。另外,该方法是同步的,也加大了写入的延迟。

建议:先获得可写入文件的 OFFSET,写入协调日志到文件,内存维护好 XA事务编号 与 OFFSET 的映射关系,从而实现顺序写入 + 并行写入


建议:已完全回滚或者提交的协调日志不放入内存。另外有文件存储好 XA事务编号 与 OFFSET 的映射关系。



PS:有兴趣的同学可以看下 RocketMQCommitLog 的存储,性能上很赞!

4.2 数据节点未全部 PREPARE 就进行 COMMIT

XA 事务定义,需要等待所有参与者全部 XA PREPARE 成功完成后发起 XA COMMIT。目前 MyCAT 是某个数据节点 XA PREPARE 完成后立即进行 XA COMMIT。比如说:第一个数据节点提交了 XAEND;XA PREPARE 时,第二个数据节在进行 XAEND;XA PREAPRE; 前挂了,第一个节点依然会 XA COMMIT 成功。

建议:按照严格的 XA 事务定义。

4.3 MyCAT 启动回滚 PREPARE 的 XA事务

1、MyCAT 启动时,回滚所有的 PREPARE 的 XA 事务,可能某个 XA 事务,部分 COMMIT,部分 PREPARE。此时直接回滚,会导致数据不一致。

建议:当判断到某个 XA 事务存在 PREPARE 的参与者,同时判断该 XA 事务里其他参与者的事务状态以及数据节点里 XA 事务状态,比如参与者为 MySQL时,可以使用 XA RECOVER 查询处于 PREPARE 所有的 XA 事务。

2、回滚 PREPARE 是异步进行的,在未进行完成时已经设置文件里回滚成功。如果异步过程中失败,会导致 XA 事务状态不一致。

建议:回调成功后,更新该 XA 事务状态。

4.4 单节点事务未记录协调日志

该情况较为极端。发起 XA PREPARE完后,MyCAT 挂了。重启后,该 XA 事务在 MyCAT 里就“消失“了,参与者的该 XA 事务一直处于 PREPARE 状态。从理论上来说,需要回滚该 XA 事务。


4.5 XA COMMIT 部分节点挂了重新恢复后,未进一步处理

当一部分节点 XA COMMIT 完成,另外一部分此时挂了。在管理员重启挂掉的节点,其对应的 XA 事务未进一步处理,导致数据不一致。


5. 彩蛋


